package cn.uncode.rpc.transport.netty;

import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;

import cn.uncode.rpc.common.log.Logger;
import cn.uncode.rpc.common.log.LoggerFactory;
import cn.uncode.rpc.core.DefaultResponse;
import cn.uncode.rpc.core.Request;
import cn.uncode.rpc.core.RpcContext;
import cn.uncode.rpc.core.URLParam;
import cn.uncode.rpc.transport.RequestHandler;
import cn.uncode.rpc.util.NetUtil;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

/**
 * @author : juny.ey
 */
public class NettyServerHandler extends SimpleChannelInboundHandler<Request> {
	
	private static final Logger LOGGER = LoggerFactory.getLogger(NettyServerHandler.class);
	
	private ThreadPoolExecutor threadPoolExecutor;
	private RequestHandler requestHandler;
	
	public NettyServerHandler(ThreadPoolExecutor threadPoolExecutor, RequestHandler requestHandler) {
		this.threadPoolExecutor = threadPoolExecutor;
		this.requestHandler = requestHandler;
	}


	@Override
	protected void channelRead0(ChannelHandlerContext ctx, Request request) throws Exception {
		request.setAttachment(URLParam.HOST.getName(), NetUtil.getHostName(ctx.channel().remoteAddress()));

		final long processStartTime = System.currentTimeMillis();
		// 使用线程池方式处理
		try {
			threadPoolExecutor.execute(new Runnable() {
				@Override
                public void run() {
				    try{
				        RpcContext.init(request);
	                    processRequest(ctx, request, processStartTime);
				    }finally{
				        RpcContext.destroy();
				    }
                }
            });
		} catch (RejectedExecutionException rejectException) {
			DefaultResponse response = new DefaultResponse();
			response.setRequestId(request.getRequestId());
			response.setException(new NettyException("process thread pool is full, reject"));
			response.setProcessTime(System.currentTimeMillis() - processStartTime);
			ctx.writeAndFlush(response);
			LOGGER.debug("process thread pool is full, reject, active={} poolSize={} corePoolSize={} maxPoolSize={} taskCount={} requestId={}",
							threadPoolExecutor.getActiveCount(), threadPoolExecutor.getPoolSize(),
							threadPoolExecutor.getCorePoolSize(), threadPoolExecutor.getMaximumPoolSize(),
							threadPoolExecutor.getTaskCount(), request.getRequestId());
		}
		
	}
	
	
	private void processRequest(ChannelHandlerContext ctx, Request request, long processStartTime) {
		
		Object result = requestHandler.handle(request);

		DefaultResponse response = null;

		if (!(result instanceof DefaultResponse)) {
			response = new DefaultResponse(result);
		} else {
			response = (DefaultResponse) result;
		}

		response.setRequestId(request.getRequestId());
		response.setProcessTime(System.currentTimeMillis() - processStartTime);
		LOGGER. info(String.format("=====>send a request to channel <%s> success, type:%s", ctx.channel()));
		if (ctx.channel().isActive()) {
			ctx.writeAndFlush(response);
		}
	}
	
}
